﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Gateway;
using System.IO;
using System.Threading;
using RabbitMQ.Gateway.Converter;

namespace RabbitMQ.Utils
{
    public class RQProducer
    {
        private static readonly log4net.ILog log = log4net.LogManager.GetLogger(typeof(RQProducer));


        /// <summary>
        /// 全局factory
        /// </summary>
        private static readonly GatewayFactory _gFactory = new GatewayFactory();

        /// <summary>
        /// 全局发布者(通道)
        /// </summary>
        private static readonly Dictionary<string, List<IPublisher>> _gpublisher = new Dictionary<string, List<IPublisher>>();

        /// <summary>
        /// 初始化发布者(通道)
        /// </summary>
        private static List<IPublisher> GetPublisher(string publisher, IConvertToMessage mConvert, bool Reconnction = false)
        {
            List<IPublisher> pubs;
            if (_gpublisher.ContainsKey(publisher) && _gpublisher[publisher].Count > 0 && !Reconnction)
            {
                pubs = _gpublisher[publisher];
            }
            else
            {
                pubs = _gFactory.GetPublisherList(publisher, mConvert.ConvertObjectToMessage, Reconnction);
                if (_gpublisher.ContainsKey(publisher))
                {
                    _gpublisher[publisher] = pubs;
                }
                else
                {
                    _gpublisher.Add(publisher, pubs);
                }
            }
            return pubs;
        }

        /// <summary>
        /// 获取发送通道(并在异常时排序)
        /// </summary>
        /// <param name="indx"></param>
        private static IPublisher GetPublisherAndSort(string PublisherStr, IConvertToMessage mConvert, bool Reconnction = false)
        {
            List<IPublisher> pubs = GetPublisher(PublisherStr, mConvert, Reconnction);
            for (int i = 0; i < pubs.Count; i++)
            {
                if (pubs[i].ChannelIsOpen)
                {
                    if (i != 0)
                    {   //如果发送消息的不是第一个publisher，则将其放到第一个位置(与第一个交换位置)
                        var temp = pubs[i];
                        pubs[i] = pubs[0];
                        pubs[0] = temp;

                        System.Diagnostics.Debug.WriteLine(string.Format("...发送者Publisher={1}从第{0}个通道发送", i, PublisherStr));
                        return pubs[0];
                    }
                    return pubs[i];
                }
                else if (i == pubs.Count - 1 && !pubs[i].ChannelIsOpen && !Reconnction)//最后一个publisher无法打开则，重连接一次
                {
                    System.Diagnostics.Debug.WriteLine(string.Format("...发送者Publisher={0}进行了重连尝试....", PublisherStr));
                    return GetPublisherAndSort(PublisherStr, mConvert, true);
                }
            }
            return null;
        }

        private static readonly MessageProcessor processor = new MessageProcessor();

        private static readonly string MqEnable = System.Configuration.ConfigurationManager.AppSettings["MQEnable"] ?? "false";

        /// <summary>
        /// 高效推送(under one connection and channel )
        /// </summary>
        /// <param name="message"></param>
        /// <param name="PublisherStr"></param>
        /// <param name="RuteKey">路由关键字，为空则取 PublisherStr中的routekey配置</param>
        /// <returns></returns>
        public static bool EfficientSend(string message, string PublisherStr = "", string RuteKey = "")
        {
            try
            {
                //未启用MQ则不发送
                if (MqEnable.Equals("false", StringComparison.CurrentCultureIgnoreCase)) return true;

                //MessageProcessor processor = new MessageProcessor(RuteKey);
                IPublisher pub = GetPublisherAndSort(PublisherStr, processor);
                bool bl = pub.Publish(message, RuteKey);
                if (!bl)
                {
                    //防止在发送瞬间正在使用的节点断开(此时还有其它的节点可用)
                    pub = GetPublisherAndSort(PublisherStr, processor);
                    bl = pub.Publish(message, RuteKey);
                    log.WarnFormat("......节点切换后,重新推送了消息：Exchange={0},RoutingKey={1},消息={2} 【{3}】", pub.Exchange, RuteKey, message, bl ? "成功" : "");
                }
                return bl;
            }
            catch (Exception ex)
            {
                log.Fatal(ex);
                return false;
            }
        }

        /// <summary>
        /// 推送对象
        /// </summary>
        /// <param name="message">推送对象(注意：类名需要加二进制序列化特性)</param>
        /// <param name="PublisherStr"></param>
        /// <param name="RuteKey">路由关键字，为空则取 PublisherStr中的routekey配置</param>
        /// <returns></returns>
        public static bool EfficientSend(object message, string PublisherStr = "", string RuteKey = "")
        {
            try
            {
                //未启用MQ则不发送
                if (MqEnable.Equals("false", StringComparison.CurrentCultureIgnoreCase)) return true;

                if (message is string)
                {
                    return EfficientSend(message.ToString(), PublisherStr, RuteKey);
                }

                //MessageProcessor processor = new MessageProcessor(RuteKey);
                byte[] bytes = Serialization.ObjectToByte(message);

                IPublisher pub = GetPublisherAndSort(PublisherStr, processor);


                bool bl = pub.Publish(bytes, RuteKey);
                if (!bl)
                {
                    //防止在发送瞬间正在使用的节点断开(此时还有其它的节点可用)
                    pub = GetPublisherAndSort(PublisherStr, processor);
                    bl = pub.Publish(message, RuteKey);
                    log.WarnFormat("......节点切换后,重新推送了消息：Exchange={0},RoutingKey={1},消息={2} 【{3}】", pub.Exchange, RuteKey, message, bl ? "成功" : "");
                }
                return bl;
            }
            catch (Exception ex)
            {
                log.Fatal(ex);
                return false;
            }
        }
    }
}
